-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DO NOT MERGE] Refactor - Move command execution responsibility to CommandHandler from IOThread #1358
base: master
Are you sure you want to change the base?
Conversation
@soumya-codes @lucifercr07 - please review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code changes look great Prateek. I have done a lot of nit picking around naming convention. Please feel free to open a separate PR to address them.
@@ -136,6 +136,7 @@ type performance struct { | |||
ShardCronFrequency time.Duration `config:"shard_cron_frequency" default:"1s"` | |||
MultiplexerPollTimeout time.Duration `config:"multiplexer_poll_timeout" default:"100ms"` | |||
MaxClients int32 `config:"max_clients" default:"20000" validate:"min=0"` | |||
MaxCmdHandlers int32 `config:"max_cmd_handlers" default:"20000" validate:"min=0"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this same as MaxClients?
Also since we do not plan to have more than 65536 client's can we have the type as int16
instead?
"github.com/dicedb/dice/internal/shard" | ||
) | ||
|
||
type Manager struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the behaviour, I think we got naming convention for Manager wrong in DiceDB. What do you think about Registrar/Registry?
func (m *Manager) UnregisterCommandHandler(id string) error { | ||
m.ShardManager.UnregisterCommandHandler(id) | ||
if cmdHandler, loaded := m.activeCmdHandlers.LoadAndDelete(id); loaded { | ||
ch := cmdHandler.(*BaseCommandHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use a comma, ok idiom here to be safe. Not needed here, but it will be failproof for future code changes.
id string | ||
parser requestparser.Parser | ||
shardManager *shard.ShardManager | ||
adhocReqChan chan *cmd.DiceDBCmd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are refactoring this I would request you to change the name of adhocReqChan
field to something more meaningful like watchReqChan
.
} | ||
|
||
type BaseCommandHandler struct { | ||
CommandHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure the structures are aligned in a way that they add lowest memory footprint..
@@ -12,7 +12,7 @@ import ( | |||
|
|||
// RespAuth returns with an encoded "OK" if the user is authenticated | |||
// If the user is not authenticated, it returns with an encoded error message | |||
func (t *BaseIOThread) RespAuth(args []string) interface{} { | |||
func (h *BaseCommandHandler) RespAuth(args []string) interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest we move this to the commandhandler.go file.
const defaultRequestTimeout = 6 * time.Second | ||
|
||
var requestCounter uint32 | ||
|
||
// IOThread interface | ||
type IOThread interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the Handler
naming convention. It follows Go naming convention, where the data-structure names are encouraged to be verbs name. Based on this should we rename IOThread to IOHandler?
@@ -21,10 +18,9 @@ var ( | |||
ErrIOThreadNotFound = errors.New("io-thread not found") | |||
) | |||
|
|||
func NewManager(maxClients int32, sm *shard.ShardManager) *Manager { | |||
func NewManager(maxClients int32) *Manager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as earlier, what do you think of naming Manager to Registar/Registry? This name provides the precise nature of the work done by this type.
@@ -104,15 +104,16 @@ func (manager *ShardManager) GetShard(id ShardID) *ShardThread { | |||
return nil | |||
} | |||
|
|||
// RegisterIOThread registers a io-thread with all Shards present in the ShardManager. | |||
func (manager *ShardManager) RegisterIOThread(id string, request, processing chan *ops.StoreResponse) { | |||
// RegisterCommandHandler registers a command handler with all Shards present in the ShardManager. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as earlier, what do you think of naming Manager to Registar/Registry? This name provides the precise nature of the work done by this type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, minor comments.
func GenerateUniqueIOThreadID() string { | ||
count := atomic.AddUint64(&ioThreadCounter, 1) | ||
timestamp := time.Now().UnixNano()/int64(time.Millisecond) - startTime | ||
return fmt.Sprintf("W-%d-%d", timestamp, count) | ||
} | ||
|
||
func GenerateUniqueCommandHandlerID() string { | ||
count := atomic.AddUint64(&cmdHandlerCounter, 1) | ||
timestamp := time.Now().UnixNano()/int64(time.Millisecond) - startTime | ||
return fmt.Sprintf("W-%d-%d", timestamp, count) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we optimise this as below? Also any reasons why we're keeping ID format for cmdHandler and IOThread same?
func GenerateUniqueID(prefix string, counter *uint64) string {
count := atomic.AddUint64(counter, 1)
timestamp := time.Now().UnixMilli() - startTime
return fmt.Sprintf("%s-%d-%d", prefix, timestamp, count)
}
func GenerateUniqueIOThreadID() string {
return GenerateUniqueID("W", &ioThreadCounter)
}
func GenerateUniqueCommandHandlerID() string {
return GenerateUniqueID("W", &cmdHandlerCounter)
}
m.mu.Lock() | ||
defer m.mu.Unlock() | ||
|
||
if m.CommandHandlerCount() >= m.maxCmdHandlers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When this returns error we've already registered an iothread, may be we can move this check earlier?
Also this'd initiate a server shutdown I believe would that be okay? Shouldn't we just drop the new client gracefully and continue server ops?
|
||
type Manager struct { | ||
activeCmdHandlers sync.Map | ||
numCmdHandlers atomic.Int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use atomic.Uint32
instead?
type Manager struct { | ||
activeCmdHandlers sync.Map | ||
numCmdHandlers atomic.Int32 | ||
maxCmdHandlers int32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above uint32
?
if responseChan != nil && preprocessingChan != nil { | ||
m.ShardManager.RegisterCommandHandler(cmdHandler.ID(), responseChan, preprocessingChan) // TODO: Change responseChan type to ShardResponse | ||
} else if responseChan != nil && preprocessingChan == nil { | ||
m.ShardManager.RegisterCommandHandler(cmdHandler.ID(), responseChan, nil) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we change it to?
m.ShardManager.RegisterCommandHandler(cmdHandler.ID(),cmdHandler.responseChan,cmdHandler.preprocessingChan)
responseChan := cmdHandler.responseChan | ||
preprocessingChan := cmdHandler.preprocessingChan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be we can have checks in handler registration to mandate responseChan
presence?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, minor comments.
@@ -212,10 +213,12 @@ func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) { | |||
cmdWatchSubscriptionChan := make(chan watchmanager.WatchSubscription) | |||
gec := make(chan error) | |||
shardManager := shard.NewShardManager(1, queryWatchChan, cmdWatchChan, gec) | |||
ioThreadManager := iothread.NewManager(20000, shardManager) | |||
ioThreadManager := iothread.NewManager(20000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use a config for this number.
|
||
// decomposeCommand is a function that takes a DiceDB command and breaks it down into smaller, | ||
// manageable DiceDB commands for each shard processing. It returns a slice of DiceDB commands. | ||
decomposeCommand func(ctx context.Context, thread *BaseIOThread, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) | ||
decomposeCommand func(ctx context.Context, h *BaseCommandHandler, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think these methods could just be defined on the BaseCommandHandler? We're passing a pointer to the command handler in each of these methods and I'm wondering if we could avoid that.
@@ -54,12 +54,12 @@ func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error { | |||
// preProcessCopy prepares the COPY command for preprocessing by sending a GET command | |||
// to retrieve the value of the original key. The retrieved value is used later in the | |||
// decomposeCopy function to copy the value to the destination key. | |||
func customProcessCopy(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error { | |||
func customProcessCopy(h *BaseCommandHandler, diceDBCmd *cmd.DiceDBCmd) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the other comment, is it possible to define these methods on the command handler struct?
} else if responseChan != nil && preprocessingChan == nil { | ||
m.ShardManager.RegisterCommandHandler(cmdHandler.ID(), responseChan, nil) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this can be placed before this if else condition and each of the two channels can be checked and initialized separately. It would avoid errors where either one of the two channels is somehow uninitialized and doesn't fit into the existing if-else logic.
TODO
[ ] Investigate CopyTest failure
[ ] Investigate SDK tests failure
This PR is a preparatory step towards solving nearly 50-60% CPU utilisation due to context shift between user space and kernel space for frequent read and write calls.
As first step, this PR moves command execution logic out of IOThread to a new entity
CommandHandler
. IOThread is now responsible for reading from and writing to the client connection, and CommandHandler is responsible for execution of command by interacting with ShardManager and WatchManager.Currently, there is 1:1 mapping between IOThread and CommandHandler, and both of them are spawned by Resp Server. IOThread and CommandHandler communicate using 3 channels
ioThreadReadChan
- to send command from IOThread to CommandHandlerioThreadWriteChan
- to send response from CommandHandler to IOThreadioThreadErrChan
- to send connection error signal from IOThread to CommandHandler, so that CommandHandler shuts down when IOHandler does